import asyncio
import base64
import hashlib
import os
from collections.abc import Mapping
from unittest import mock

import pytest

import aiohttp
from aiohttp import (
    ClientConnectionResetError,
    ClientWSTimeout,
    ServerDisconnectedError,
    client,
    hdrs,
)
from aiohttp.http import WS_KEY
from aiohttp.http_websocket import WSMessageClose
from aiohttp.streams import EofStream


async def test_ws_connect(
    ws_key: str, loop: asyncio.AbstractEventLoop, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_PROTOCOL: "chat",
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            res = await aiohttp.ClientSession().ws_connect(
                "http://test.org", protocols=("t1", "t2", "chat")
            )

    assert isinstance(res, client.ClientWebSocketResponse)
    assert res.protocol == "chat"
    assert hdrs.ORIGIN not in m_req.call_args[1]["headers"]


async def test_ws_connect_read_timeout_is_reset_to_inf(
    ws_key: str, loop: asyncio.AbstractEventLoop, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_PROTOCOL: "chat",
    }
    resp.connection.protocol.read_timeout = 0.5
    with (
        mock.patch("aiohttp.client.os") as m_os,
        mock.patch("aiohttp.client.ClientSession.request") as m_req,
    ):
        m_os.urandom.return_value = key_data
        m_req.return_value = loop.create_future()
        m_req.return_value.set_result(resp)

        res = await aiohttp.ClientSession().ws_connect(
            "http://test.org", protocols=("t1", "t2", "chat")
        )

    assert isinstance(res, client.ClientWebSocketResponse)
    assert res.protocol == "chat"
    assert hdrs.ORIGIN not in m_req.call_args[1]["headers"]
    assert resp.connection.protocol.read_timeout is None


async def test_ws_connect_read_timeout_stays_inf(
    ws_key: str, loop: asyncio.AbstractEventLoop, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_PROTOCOL: "chat",
    }
    resp.connection.protocol.read_timeout = None
    with (
        mock.patch("aiohttp.client.os") as m_os,
        mock.patch("aiohttp.client.ClientSession.request") as m_req,
    ):
        m_os.urandom.return_value = key_data
        m_req.return_value = loop.create_future()
        m_req.return_value.set_result(resp)

        res = await aiohttp.ClientSession().ws_connect(
            "http://test.org",
            protocols=("t1", "t2", "chat"),
            timeout=ClientWSTimeout(0.5),
        )

    assert isinstance(res, client.ClientWebSocketResponse)
    assert res.protocol == "chat"
    assert hdrs.ORIGIN not in m_req.call_args[1]["headers"]
    assert resp.connection.protocol.read_timeout is None


async def test_ws_connect_read_timeout_reset_to_max(
    ws_key: str, loop: asyncio.AbstractEventLoop, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_PROTOCOL: "chat",
    }
    resp.connection.protocol.read_timeout = 0.5
    with (
        mock.patch("aiohttp.client.os") as m_os,
        mock.patch("aiohttp.client.ClientSession.request") as m_req,
    ):
        m_os.urandom.return_value = key_data
        m_req.return_value = loop.create_future()
        m_req.return_value.set_result(resp)

        res = await aiohttp.ClientSession().ws_connect(
            "http://test.org",
            protocols=("t1", "t2", "chat"),
            timeout=ClientWSTimeout(1.0),
        )

    assert isinstance(res, client.ClientWebSocketResponse)
    assert res.protocol == "chat"
    assert hdrs.ORIGIN not in m_req.call_args[1]["headers"]
    assert resp.connection.protocol.read_timeout == 1.0


async def test_ws_connect_with_origin(
    key_data: bytes, loop: asyncio.AbstractEventLoop
) -> None:
    resp = mock.Mock()
    resp.status = 403
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            origin = "https://example.org/page.html"
            with pytest.raises(client.WSServerHandshakeError):
                await aiohttp.ClientSession().ws_connect(
                    "http://test.org", origin=origin
                )

    assert hdrs.ORIGIN in m_req.call_args[1]["headers"]
    assert m_req.call_args[1]["headers"][hdrs.ORIGIN] == origin


async def test_ws_connect_with_params(
    ws_key: str, loop: asyncio.AbstractEventLoop, key_data: bytes
) -> None:
    params = {"key1": "value1", "key2": "value2"}

    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_PROTOCOL: "chat",
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            await aiohttp.ClientSession().ws_connect(
                "http://test.org", protocols=("t1", "t2", "chat"), params=params
            )

    assert m_req.call_args[1]["params"] == params


async def test_ws_connect_custom_response(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    class CustomResponse(client.ClientWebSocketResponse):
        def read(self, decode: bool = False) -> str:
            return "customized!"

    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            res = await aiohttp.ClientSession(
                ws_response_class=CustomResponse
            ).ws_connect("http://test.org")

    # TODO(PY313): Use TypeVar(default=) to make ClientSession Generic over response classes.
    #              Then .ws_connect() can return CustomResponse here.
    assert res.read() == "customized!"  # type: ignore[attr-defined]


async def test_ws_connect_err_status(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 500
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            with pytest.raises(client.WSServerHandshakeError) as ctx:
                await aiohttp.ClientSession().ws_connect(
                    "http://test.org", protocols=("t1", "t2", "chat")
                )

    assert ctx.value.message == "Invalid response status"


async def test_ws_connect_err_upgrade(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "test",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            with pytest.raises(client.WSServerHandshakeError) as ctx:
                await aiohttp.ClientSession().ws_connect(
                    "http://test.org", protocols=("t1", "t2", "chat")
                )

    assert ctx.value.message == "Invalid upgrade header"


async def test_ws_connect_err_conn(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "close",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            with pytest.raises(client.WSServerHandshakeError) as ctx:
                await aiohttp.ClientSession().ws_connect(
                    "http://test.org", protocols=("t1", "t2", "chat")
                )

    assert ctx.value.message == "Invalid connection header"


async def test_ws_connect_err_challenge(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: "asdfasdfasdfasdfasdfasdf",
    }
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            with pytest.raises(client.WSServerHandshakeError) as ctx:
                await aiohttp.ClientSession().ws_connect(
                    "http://test.org", protocols=("t1", "t2", "chat")
                )

    assert ctx.value.message == "Invalid challenge response"


async def test_ws_connect_common_headers(
    ws_key: str, loop: asyncio.AbstractEventLoop, key_data: bytes
) -> None:
    # Emulate a headers dict being reused for a second ws_connect.

    # In this scenario, we need to ensure that the newly generated secret key
    # is sent to the server, not the stale key.
    headers: dict[str, str] = {}

    async def test_connection() -> None:
        async def mock_get(
            *args: object, headers: Mapping[str, str], **kwargs: object
        ) -> mock.Mock:
            resp = mock.Mock()
            resp.status = 101
            key = headers[hdrs.SEC_WEBSOCKET_KEY]
            accept = base64.b64encode(
                hashlib.sha1(base64.b64encode(base64.b64decode(key)) + WS_KEY).digest()
            ).decode()
            resp.headers = {
                hdrs.UPGRADE: "websocket",
                hdrs.CONNECTION: "upgrade",
                hdrs.SEC_WEBSOCKET_ACCEPT: accept,
                hdrs.SEC_WEBSOCKET_PROTOCOL: "chat",
            }
            resp.connection.protocol.read_timeout = None
            return resp

        with mock.patch("aiohttp.client.os") as m_os:
            with mock.patch(
                "aiohttp.client.ClientSession.request", side_effect=mock_get
            ) as m_req:
                m_os.urandom.return_value = key_data

                res = await aiohttp.ClientSession().ws_connect(
                    "http://test.org", protocols=("t1", "t2", "chat"), headers=headers
                )

        assert isinstance(res, client.ClientWebSocketResponse)
        assert res.protocol == "chat"
        assert hdrs.ORIGIN not in m_req.call_args[1]["headers"]

    await test_connection()
    # Generate a new ws key
    key_data = os.urandom(16)
    await test_connection()


async def test_close(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    mresp = mock.Mock()
    mresp.status = 101
    mresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    mresp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter:
        with mock.patch("aiohttp.client.os") as m_os:
            with mock.patch("aiohttp.client.ClientSession.request") as m_req:
                m_os.urandom.return_value = key_data
                m_req.return_value = loop.create_future()
                m_req.return_value.set_result(mresp)
                writer = mock.Mock()
                WebSocketWriter.return_value = writer
                writer.close = mock.AsyncMock()

                session = aiohttp.ClientSession()
                resp = await session.ws_connect("http://test.org")
                assert not resp.closed

                resp._reader.feed_data(WSMessageClose(data=0, size=0, extra=""))

                res = await resp.close()
                writer.close.assert_called_with(1000, b"")
                assert resp.closed
                assert res  # type: ignore[unreachable]
                assert resp.exception() is None

                # idempotent
                res = await resp.close()
                assert not res
                assert writer.close.call_count == 1

                await session.close()


async def test_close_eofstream(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    mresp = mock.Mock()
    mresp.status = 101
    mresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    mresp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter:
        with mock.patch("aiohttp.client.os") as m_os:
            with mock.patch("aiohttp.client.ClientSession.request") as m_req:
                m_os.urandom.return_value = key_data
                m_req.return_value = loop.create_future()
                m_req.return_value.set_result(mresp)
                writer = WebSocketWriter.return_value = mock.Mock()

                session = aiohttp.ClientSession()
                resp = await session.ws_connect("http://test.org")
                assert not resp.closed

                exc = EofStream()
                resp._reader.set_exception(exc)

                await resp.receive()
                writer.close.assert_called_with(1000, b"")
                assert resp.closed

                await session.close()  # type: ignore[unreachable]


async def test_close_connection_lost(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    """Test the websocket client handles the connection being closed out from under it."""
    mresp = mock.Mock(spec_set=client.ClientResponse)
    mresp.status = 101
    mresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    mresp.connection.protocol.read_timeout = None
    with (
        mock.patch("aiohttp.client.WebSocketWriter"),
        mock.patch("aiohttp.client.os") as m_os,
        mock.patch("aiohttp.client.ClientSession.request") as m_req,
    ):
        m_os.urandom.return_value = key_data
        m_req.return_value = loop.create_future()
        m_req.return_value.set_result(mresp)

        session = aiohttp.ClientSession()
        resp = await session.ws_connect("http://test.org")
        assert not resp.closed

        exc = ServerDisconnectedError()
        resp._reader.set_exception(exc)

        msg = await resp.receive()
        assert msg.type is aiohttp.WSMsgType.CLOSED
        assert resp.closed

        await session.close()  # type: ignore[unreachable]


async def test_close_exc(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    mresp = mock.Mock()
    mresp.status = 101
    mresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    mresp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter:
        with mock.patch("aiohttp.client.os") as m_os:
            with mock.patch("aiohttp.client.ClientSession.request") as m_req:
                m_os.urandom.return_value = key_data
                m_req.return_value = loop.create_future()
                m_req.return_value.set_result(mresp)
                writer = mock.Mock()
                WebSocketWriter.return_value = writer
                writer.close = mock.AsyncMock()

                session = aiohttp.ClientSession()
                resp = await session.ws_connect("http://test.org")
                assert not resp.closed

                exc = ValueError()
                resp._reader.set_exception(exc)

                await resp.close()
                assert resp.closed
                assert resp.exception() is exc  # type: ignore[unreachable]

                await session.close()


async def test_close_exc2(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    mresp = mock.Mock()
    mresp.status = 101
    mresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    mresp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter:
        with mock.patch("aiohttp.client.os") as m_os:
            with mock.patch("aiohttp.client.ClientSession.request") as m_req:
                m_os.urandom.return_value = key_data
                m_req.return_value = loop.create_future()
                m_req.return_value.set_result(mresp)
                writer = WebSocketWriter.return_value = mock.Mock()

                resp = await aiohttp.ClientSession().ws_connect("http://test.org")
                assert not resp.closed

                exc = ValueError()
                writer.close.side_effect = exc

                await resp.close()
                assert resp.closed
                assert resp.exception() is exc  # type: ignore[unreachable]

                resp._closed = False
                writer.close.side_effect = asyncio.CancelledError()
                with pytest.raises(asyncio.CancelledError):
                    await resp.close()


@pytest.mark.parametrize("exc", (ClientConnectionResetError, ConnectionResetError))
async def test_send_data_after_close(
    exc: type[Exception],
    ws_key: str,
    key_data: bytes,
    loop: asyncio.AbstractEventLoop,
) -> None:
    mresp = mock.Mock()
    mresp.status = 101
    mresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    mresp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(mresp)

            resp = await aiohttp.ClientSession().ws_connect("http://test.org")
            resp._writer._closing = True

            for meth, args in (
                (resp.ping, ()),
                (resp.pong, ()),
                (resp.send_str, ("s",)),
                (resp.send_bytes, (b"b",)),
                (resp.send_json, ({},)),
                (resp.send_frame, (b"", aiohttp.WSMsgType.BINARY)),
            ):
                with pytest.raises(exc):  # Verify exc can be caught with both classes
                    await meth(*args)


async def test_send_data_type_errors(
    ws_key: str, key_data: bytes, loop: asyncio.AbstractEventLoop
) -> None:
    mresp = mock.Mock()
    mresp.status = 101
    mresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    mresp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter:
        with mock.patch("aiohttp.client.os") as m_os:
            with mock.patch("aiohttp.client.ClientSession.request") as m_req:
                m_os.urandom.return_value = key_data
                m_req.return_value = loop.create_future()
                m_req.return_value.set_result(mresp)
                WebSocketWriter.return_value = mock.Mock()

                resp = await aiohttp.ClientSession().ws_connect("http://test.org")

                with pytest.raises(TypeError):
                    await resp.send_str(b"s")  # type: ignore[arg-type]
                with pytest.raises(TypeError):
                    await resp.send_bytes("b")  # type: ignore[arg-type]
                with pytest.raises(TypeError):
                    await resp.send_json(set())


async def test_reader_read_exception(
    ws_key: str, key_data: bytes, loop: asyncio.AbstractEventLoop
) -> None:
    hresp = mock.Mock()
    hresp.status = 101
    hresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    hresp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter:
        with mock.patch("aiohttp.client.os") as m_os:
            with mock.patch("aiohttp.client.ClientSession.request") as m_req:
                m_os.urandom.return_value = key_data
                m_req.return_value = loop.create_future()
                m_req.return_value.set_result(hresp)

                writer = mock.Mock()
                WebSocketWriter.return_value = writer
                writer.close = mock.AsyncMock()

                session = aiohttp.ClientSession()
                resp = await session.ws_connect("http://test.org")

                exc = ValueError()
                resp._reader.set_exception(exc)

                msg = await resp.receive()
                assert msg.type == aiohttp.WSMsgType.ERROR
                assert resp.exception() is exc

                await session.close()


async def test_receive_runtime_err(loop: asyncio.AbstractEventLoop) -> None:
    resp = client.ClientWebSocketResponse(
        mock.Mock(),
        mock.Mock(),
        mock.Mock(),
        mock.Mock(),
        ClientWSTimeout(ws_receive=10.0),
        True,
        True,
        loop,
    )
    resp._waiting = True

    with pytest.raises(RuntimeError):
        await resp.receive()


async def test_ws_connect_close_resp_on_err(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 500
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            with pytest.raises(client.WSServerHandshakeError):
                await aiohttp.ClientSession().ws_connect(
                    "http://test.org", protocols=("t1", "t2", "chat")
                )
            resp.close.assert_called_with()


async def test_ws_connect_non_overlapped_protocols(
    ws_key: str, loop: asyncio.AbstractEventLoop, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_PROTOCOL: "other,another",
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            res = await aiohttp.ClientSession().ws_connect(
                "http://test.org", protocols=("t1", "t2", "chat")
            )

    assert res.protocol is None


async def test_ws_connect_non_overlapped_protocols_2(
    ws_key: str, loop: asyncio.AbstractEventLoop, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_PROTOCOL: "other,another",
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            connector = aiohttp.TCPConnector(force_close=True)
            res = await aiohttp.ClientSession(connector=connector).ws_connect(
                "http://test.org", protocols=("t1", "t2", "chat")
            )

    assert res.protocol is None
    del res


async def test_ws_connect_deflate(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_EXTENSIONS: "permessage-deflate",
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            res = await aiohttp.ClientSession().ws_connect(
                "http://test.org", compress=15
            )

    assert res.compress == 15
    assert res.client_notakeover is False


async def test_ws_connect_deflate_per_message(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    mresp = mock.Mock()
    mresp.status = 101
    mresp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_EXTENSIONS: "permessage-deflate",
    }
    mresp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter:
        with mock.patch("aiohttp.client.os") as m_os:
            with mock.patch("aiohttp.client.ClientSession.request") as m_req:
                m_os.urandom.return_value = key_data
                m_req.return_value = loop.create_future()
                m_req.return_value.set_result(mresp)
                writer = WebSocketWriter.return_value = mock.Mock()
                send_frame = writer.send_frame = mock.AsyncMock()

                session = aiohttp.ClientSession()
                resp = await session.ws_connect("http://test.org")

                await resp.send_str("string", compress=-1)
                send_frame.assert_called_with(
                    b"string", aiohttp.WSMsgType.TEXT, compress=-1
                )

                await resp.send_bytes(b"bytes", compress=15)
                send_frame.assert_called_with(
                    b"bytes", aiohttp.WSMsgType.BINARY, compress=15
                )

                await resp.send_json([{}], compress=-9)
                send_frame.assert_called_with(
                    b"[{}]", aiohttp.WSMsgType.TEXT, compress=-9
                )

                await resp.send_frame(b"[{}]", aiohttp.WSMsgType.TEXT, compress=-9)
                send_frame.assert_called_with(b"[{}]", aiohttp.WSMsgType.TEXT, -9)

                await session.close()


async def test_ws_connect_deflate_server_not_support(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            res = await aiohttp.ClientSession().ws_connect(
                "http://test.org", compress=15
            )

    assert res.compress == 0
    assert res.client_notakeover is False


async def test_ws_connect_deflate_notakeover(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_EXTENSIONS: "permessage-deflate; "
        "client_no_context_takeover",
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            res = await aiohttp.ClientSession().ws_connect(
                "http://test.org", compress=15
            )

    assert res.compress == 15
    assert res.client_notakeover is True


async def test_ws_connect_deflate_client_wbits(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_EXTENSIONS: "permessage-deflate; "
        "client_max_window_bits=10",
    }
    resp.connection.protocol.read_timeout = None
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            res = await aiohttp.ClientSession().ws_connect(
                "http://test.org", compress=15
            )

    assert res.compress == 10
    assert res.client_notakeover is False


async def test_ws_connect_deflate_client_wbits_bad(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_EXTENSIONS: "permessage-deflate; "
        "client_max_window_bits=6",
    }
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            with pytest.raises(client.WSServerHandshakeError):
                await aiohttp.ClientSession().ws_connect("http://test.org", compress=15)


async def test_ws_connect_deflate_server_ext_bad(
    loop: asyncio.AbstractEventLoop, ws_key: str, key_data: bytes
) -> None:
    resp = mock.Mock()
    resp.status = 101
    resp.headers = {
        hdrs.UPGRADE: "websocket",
        hdrs.CONNECTION: "upgrade",
        hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
        hdrs.SEC_WEBSOCKET_EXTENSIONS: "permessage-deflate; bad",
    }
    with mock.patch("aiohttp.client.os") as m_os:
        with mock.patch("aiohttp.client.ClientSession.request") as m_req:
            m_os.urandom.return_value = key_data
            m_req.return_value = loop.create_future()
            m_req.return_value.set_result(resp)

            with pytest.raises(client.WSServerHandshakeError):
                await aiohttp.ClientSession().ws_connect("http://test.org", compress=15)
